Tasks
Learn how to create and schedule automated tasks to enhance your trading system.
Overview
Tasks are automated functions that run independently of your trading strategies. They can be used for maintenance, data collection, reporting, monitoring, and other background operations that support your trading system.
Creating Tasks
Basic Task Structure
from investing_algorithm_framework import Task
class DataCleanupTask(Task):
def __init__(self):
super().__init__(
name="data_cleanup",
interval="daily", # Run daily
time="02:00" # Run at 2 AM
)
def run(self, algorithm):
"""Task execution logic"""
print("Running data cleanup task...")
# Cleanup old data
self.cleanup_old_market_data()
# Compact database
self.compact_database()
print("Data cleanup completed")
def cleanup_old_market_data(self):
# Implementation for data cleanup
pass
def compact_database(self):
# Implementation for database optimization
pass
Registering Tasks
from investing_algorithm_framework import create_app
app = create_app()
# Register the task
app.add_task(DataCleanupTask())
# Start the app (tasks will run automatically)
app.start()
Task Scheduling
Schedule Types
Fixed Intervals:
# Run every 5 minutes
class MarketDataTask(Task):
def __init__(self):
super().__init__(
name="market_data_collection",
interval="5m"
)
# Run every hour
class PortfolioReportTask(Task):
def __init__(self):
super().__init__(
name="portfolio_report",
interval="1h"
)
# Run daily
class BackupTask(Task):
def __init__(self):
super().__init__(
name="daily_backup",
interval="daily",
time="23:30"
)
Cron-style Scheduling:
class WeeklyReportTask(Task):
def __init__(self):
super().__init__(
name="weekly_report",
cron="0 9 * * MON" # Every Monday at 9 AM
)
class MonthlyRebalanceTask(Task):
def __init__(self):
super().__init__(
name="monthly_rebalance",
cron="0 0 1 * *" # First day of each month
)
Common Task Examples
Market Data Collection
class MarketDataCollector(Task):
def __init__(self, symbols, data_provider):
super().__init__(
name="market_data_collector",
interval="1m" # Collect every minute
)
self.symbols = symbols
self.data_provider = data_provider
def run(self, algorithm):
"""Collect market data for specified symbols"""
for symbol in self.symbols:
try:
# Fetch latest data
data = self.data_provider.get_latest_data(symbol)
# Store in database
algorithm.store_market_data(symbol, data)
print(f"Collected data for {symbol}")
except Exception as e:
print(f"Failed to collect data for {symbol}: {e}")
Portfolio Monitoring
class PortfolioMonitor(Task):
def __init__(self, alert_manager):
super().__init__(
name="portfolio_monitor",
interval="5m"
)
self.alert_manager = alert_manager
def run(self, algorithm):
"""Monitor portfolio health and send alerts"""
portfolio = algorithm.get_portfolio()
positions = algorithm.get_positions()
# Check total portfolio value
total_value = portfolio.get_total_value()
initial_value = portfolio.get_initial_value()
pnl_percentage = (total_value - initial_value) / initial_value * 100
# Alert on significant changes
if pnl_percentage < -10:
self.alert_manager.send_alert(
f"Portfolio down {abs(pnl_percentage):.2f}%",
severity="WARNING"
)
elif pnl_percentage > 20:
self.alert_manager.send_alert(
f"Portfolio up {pnl_percentage:.2f}%",
severity="INFO"
)
# Check individual positions
self.check_position_alerts(positions)
def check_position_alerts(self, positions):
"""Check for position-specific alerts"""
for position in positions:
# Alert on large positions
if position.current_value > 5000:
print(f"Large position alert: {position.symbol} = ${position.current_value:.2f}")
Performance Reporting
class PerformanceReporter(Task):
def __init__(self, report_email=None):
super().__init__(
name="performance_reporter",
interval="daily",
time="18:00" # 6 PM daily
)
self.report_email = report_email
def run(self, algorithm):
"""Generate and send daily performance report"""
# Calculate daily metrics
daily_metrics = self.calculate_daily_metrics(algorithm)
# Generate report
report = self.generate_report(daily_metrics)
# Send report
if self.report_email:
self.send_report_email(report)
print("Daily performance report generated")
def calculate_daily_metrics(self, algorithm):
"""Calculate daily performance metrics"""
portfolio = algorithm.get_portfolio()
trades = algorithm.get_trades()
# Get today's trades
today = datetime.now().date()
today_trades = [
t for t in trades
if t.created_at.date() == today
]
metrics = {
"portfolio_value": portfolio.get_total_value(),
"daily_trades": len(today_trades),
"daily_volume": sum(t.cost for t in today_trades),
"daily_fees": sum(t.fee for t in today_trades),
"open_positions": len(algorithm.get_positions())
}
return metrics
def generate_report(self, metrics):
"""Generate formatted report"""
report = f"""
Daily Trading Report - {datetime.now().strftime('%Y-%m-%d')}
=====================================================
Portfolio Value: ${metrics['portfolio_value']:,.2f}
Daily Activity:
- Trades: {metrics['daily_trades']}
- Volume: ${metrics['daily_volume']:,.2f}
- Fees: ${metrics['daily_fees']:,.2f}
- Open Positions: {metrics['open_positions']}
Generated at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return report
Risk Management
class RiskManager(Task):
def __init__(self, max_drawdown=0.15, max_position_size=0.1):
super().__init__(
name="risk_manager",
interval="1m" # Check risk every minute
)
self.max_drawdown = max_drawdown
self.max_position_size = max_position_size
def run(self, algorithm):
"""Monitor and enforce risk limits"""
# Check portfolio drawdown
self.check_drawdown(algorithm)
# Check position sizes
self.check_position_sizes(algorithm)
# Check correlation exposure
self.check_correlation_risk(algorithm)
def check_drawdown(self, algorithm):
"""Check if portfolio drawdown exceeds limit"""
portfolio = algorithm.get_portfolio()
peak_value = portfolio.get_peak_value()
current_value = portfolio.get_total_value()
drawdown = (peak_value - current_value) / peak_value
if drawdown > self.max_drawdown:
# Emergency risk reduction
self.reduce_risk(algorithm, f"Drawdown {drawdown:.2%} exceeds limit")
def check_position_sizes(self, algorithm):
"""Check if any position is too large"""
portfolio = algorithm.get_portfolio()
positions = algorithm.get_positions()
total_value = portfolio.get_total_value()
for position in positions:
position_weight = position.current_value / total_value
if position_weight > self.max_position_size:
# Reduce oversized position
target_symbol = position.symbol.split('/')[0]
excess_percentage = position_weight - self.max_position_size
algorithm.create_sell_order(
target_symbol=target_symbol,
percentage=excess_percentage / position_weight,
order_type="MARKET"
)
print(f"Reduced oversized position: {position.symbol}")
def reduce_risk(self, algorithm, reason):
"""Emergency risk reduction"""
print(f"RISK ALERT: {reason}")
print("Implementing risk reduction measures...")
# Cancel all open orders
algorithm.cancel_all_orders()
# Reduce position sizes by 50%
positions = algorithm.get_positions()
for position in positions:
target_symbol = position.symbol.split('/')[0]
algorithm.create_sell_order(
target_symbol=target_symbol,
percentage=0.5,
order_type="MARKET"
)
print("Risk reduction completed")
Database Maintenance
class DatabaseMaintenanceTask(Task):
def __init__(self):
super().__init__(
name="database_maintenance",
interval="daily",
time="03:00" # 3 AM daily
)
def run(self, algorithm):
"""Perform database maintenance tasks"""
print("Starting database maintenance...")
# Archive old data
self.archive_old_data(algorithm)
# Optimize database
self.optimize_database(algorithm)
# Backup database
self.backup_database(algorithm)
print("Database maintenance completed")
def archive_old_data(self, algorithm):
"""Archive old market data and trades"""
cutoff_date = datetime.now() - timedelta(days=365)
# Archive old trades
old_trades = algorithm.get_trades(before_date=cutoff_date)
if old_trades:
algorithm.archive_trades(old_trades)
print(f"Archived {len(old_trades)} old trades")
# Archive old market data
algorithm.archive_market_data(before_date=cutoff_date)
def optimize_database(self, algorithm):
"""Optimize database performance"""
# Rebuild indices
algorithm.rebuild_database_indices()
# Update statistics
algorithm.update_database_statistics()
print("Database optimization completed")
def backup_database(self, algorithm):
"""Create database backup"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"/backups/trading_db_backup_{timestamp}.sql"
algorithm.backup_database(backup_path)
print(f"Database backed up to {backup_path}")
Task Management
Task Monitoring
class TaskMonitor(Task):
def __init__(self):
super().__init__(
name="task_monitor",
interval="10m"
)
self.task_history = {}
def run(self, algorithm):
"""Monitor other tasks for failures"""
# Get task execution history
task_status = algorithm.get_task_status()
for task_name, status in task_status.items():
if status.get('last_error'):
print(f"Task {task_name} failed: {status['last_error']}")
# Check if task is overdue
last_run = status.get('last_run')
if last_run:
time_since_run = datetime.now() - last_run
if time_since_run > timedelta(hours=2): # Configurable threshold
print(f"Task {task_name} is overdue (last run: {last_run})")
Conditional Tasks
class ConditionalRebalanceTask(Task):
def __init__(self):
super().__init__(
name="conditional_rebalance",
interval="1h"
)
def run(self, algorithm):
"""Only rebalance if conditions are met"""
# Check if rebalancing is needed
if not self.should_rebalance(algorithm):
return
print("Rebalancing conditions met - executing rebalance")
self.execute_rebalance(algorithm)
def should_rebalance(self, algorithm):
"""Check if rebalancing conditions are met"""
positions = algorithm.get_positions()
portfolio = algorithm.get_portfolio()
total_value = portfolio.get_total_value()
# Check if any position deviates more than 5% from target
target_weights = {"BTC": 0.5, "ETH": 0.3, "ADA": 0.2}
for symbol, target_weight in target_weights.items():
position = next(
(p for p in positions if p.symbol.startswith(symbol)),
None
)
current_weight = (position.current_value / total_value) if position else 0
if abs(current_weight - target_weight) > 0.05:
return True
return False
def execute_rebalance(self, algorithm):
"""Execute portfolio rebalancing"""
# Implementation for rebalancing logic
pass
Best Practices
1. Error Handling
class RobustTask(Task):
def __init__(self):
super().__init__(name="robust_task", interval="5m")
self.max_retries = 3
self.retry_delay = 30 # seconds
def run(self, algorithm):
"""Run task with error handling and retries"""
for attempt in range(self.max_retries):
try:
self.execute_task_logic(algorithm)
return # Success, exit retry loop
except Exception as e:
print(f"Task attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
print(f"Task failed after {self.max_retries} attempts")
self.handle_task_failure(e)
def execute_task_logic(self, algorithm):
"""Main task logic that might fail"""
# Implementation here
pass
def handle_task_failure(self, error):
"""Handle permanent task failure"""
# Log error, send alerts, etc.
pass
2. Resource Management
class ResourceAwareTask(Task):
def run(self, algorithm):
"""Task that monitors resource usage"""
# Check system resources before running
if not self.has_sufficient_resources():
print("Insufficient resources - skipping task execution")
return
# Execute task logic
self.execute_heavy_computation()
def has_sufficient_resources(self):
"""Check if system has sufficient resources"""
import psutil
# Check memory usage
memory_percent = psutil.virtual_memory().percent
if memory_percent > 90:
return False
# Check CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 95:
return False
return True
3. Task Dependencies
class DependentTask(Task):
def __init__(self, prerequisite_tasks):
super().__init__(name="dependent_task", interval="1h")
self.prerequisite_tasks = prerequisite_tasks
def run(self, algorithm):
"""Only run if prerequisite tasks completed successfully"""
# Check if prerequisites are met
if not self.prerequisites_met(algorithm):
print("Prerequisites not met - skipping task")
return
# Execute task logic
self.execute_task(algorithm)
def prerequisites_met(self, algorithm):
"""Check if prerequisite tasks completed successfully"""
task_status = algorithm.get_task_status()
for prereq_task in self.prerequisite_tasks:
status = task_status.get(prereq_task, {})
if status.get('last_error'):
return False
# Check if task ran recently
last_run = status.get('last_run')
if not last_run or (datetime.now() - last_run) > timedelta(hours=2):
return False
return True
Next Steps
Tasks provide powerful automation capabilities for your trading system. Next, learn about Backtesting to test your strategies and tasks against historical data before deploying them live.